1use std::collections::HashMap;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::{Arc, Mutex};
28
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30use tokio::sync::mpsc;
31
32use super::{
33 BlockControllerRuntimeStatus, BlockInputUnion, Controller, STATUS_DONE, STATUS_INIT,
34 STATUS_RUNNING,
35};
36use super::health::HealthMonitor;
37use crate::backend::eventbus::EventBus;
38use crate::backend::storage::filestore::FileStore;
39use crate::backend::storage::wstore::WaveStore;
40use crate::backend::wps;
41
42pub const ACP_OUTPUT_SUBJECT: &str = "output";
44
45pub const BLOCK_CONTROLLER_ACP: &str = "acp";
47
48struct AcpInner {
50 proc_status: String,
51 proc_exit_code: i32,
52 status_version: i32,
53 session_id: Option<String>,
54 current_pid: Option<u32>,
55 stdin_tx: Option<mpsc::Sender<String>>,
56 kill_tx: Option<tokio::sync::oneshot::Sender<bool>>,
57 pending_prompt: Option<String>,
59}
60
61pub struct AcpController {
63 #[allow(dead_code)]
64 tab_id: String,
65 block_id: String,
66 inner: Arc<Mutex<AcpInner>>,
67 broker: Option<Arc<wps::Broker>>,
68 event_bus: Option<Arc<EventBus>>,
69 wstore: Option<Arc<WaveStore>>,
70 filestore: Option<Arc<FileStore>>,
71 health_monitor: Arc<HealthMonitor>,
72 next_rpc_id: Arc<AtomicU64>,
74}
75
76impl AcpController {
77 pub fn new(
78 tab_id: String,
79 block_id: String,
80 broker: Option<Arc<wps::Broker>>,
81 event_bus: Option<Arc<EventBus>>,
82 wstore: Option<Arc<WaveStore>>,
83 filestore: Option<Arc<FileStore>>,
84 ) -> Self {
85 let health_monitor = Arc::new(HealthMonitor::new(
86 block_id.clone(),
87 broker.clone(),
88 ));
89 Self {
90 tab_id,
91 block_id,
92 inner: Arc::new(Mutex::new(AcpInner {
93 proc_status: STATUS_INIT.to_string(),
94 proc_exit_code: 0,
95 status_version: 0,
96 session_id: None,
97 current_pid: None,
98 stdin_tx: None,
99 kill_tx: None,
100 pending_prompt: None,
101 })),
102 broker,
103 event_bus,
104 wstore,
105 filestore,
106 health_monitor,
107 next_rpc_id: Arc::new(AtomicU64::new(1)),
108 }
109 }
110
111 fn next_id(&self) -> u64 {
112 self.next_rpc_id.fetch_add(1, Ordering::Relaxed)
113 }
114
115 fn set_status(inner: &mut AcpInner, status: &str) {
116 inner.proc_status = status.to_string();
117 inner.status_version += 1;
118 }
119
120 fn get_status_snapshot(&self) -> BlockControllerRuntimeStatus {
121 let inner = self.inner.lock().unwrap();
122 BlockControllerRuntimeStatus {
123 blockid: self.block_id.clone(),
124 version: inner.status_version,
125 shellprocstatus: inner.proc_status.clone(),
126 shellprocconnname: "local".to_string(),
127 shellprocexitcode: inner.proc_exit_code,
128 spawn_ts_ms: None,
129 is_agent_pane: true,
130 }
131 }
132
133 fn publish_status(&self) {
134 if let Some(ref broker) = self.broker {
135 let status = self.get_status_snapshot();
136 super::publish_controller_status(broker, &status);
137 }
138 }
139
140 fn is_running(&self) -> bool {
141 let inner = self.inner.lock().unwrap();
142 inner.stdin_tx.is_some()
143 }
144
145 fn make_request(&self, method: &str, params: serde_json::Value) -> String {
147 let id = self.next_id();
148 serde_json::json!({
149 "jsonrpc": "2.0",
150 "id": id,
151 "method": method,
152 "params": params,
153 }).to_string()
154 }
155
156 fn make_notification(&self, method: &str, params: serde_json::Value) -> String {
158 serde_json::json!({
159 "jsonrpc": "2.0",
160 "method": method,
161 "params": params,
162 }).to_string()
163 }
164
165 pub fn send_message(&self, message: String, cli_command: String, cli_args: Vec<String>, working_dir: String, env_vars: HashMap<String, String>) -> Result<(), String> {
169 if !self.is_running() {
170 {
173 let mut inner = self.inner.lock().unwrap();
174 inner.pending_prompt = Some(message.clone());
175 }
176 self.health_monitor.set_active_turn(true);
177 return self.spawn_process(cli_command, cli_args, working_dir, env_vars);
178 }
179
180 let session_id = {
182 let inner = self.inner.lock().unwrap();
183 inner.session_id.clone().unwrap_or_default()
184 };
185
186 let req = self.make_request("session/prompt", serde_json::json!({
187 "sessionId": session_id,
188 "prompt": {
189 "type": "text",
190 "text": message,
191 }
192 }));
193
194 self.health_monitor.set_active_turn(true);
195
196 let inner = self.inner.lock().unwrap();
197 let tx = inner.stdin_tx.as_ref()
198 .ok_or("ACP process not running after spawn")?;
199 tx.try_send(req)
200 .map_err(|e| format!("ACP stdin send failed: {e}"))
201 }
202
203 fn spawn_process(&self, cli_command: String, cli_args: Vec<String>, working_dir: String, env_vars: HashMap<String, String>) -> Result<(), String> {
205 let mut cmd = crate::server::cli_handlers::make_cli_cmd(&cli_command);
206 cmd.args(&cli_args);
207
208 if !working_dir.is_empty() {
210 let expanded_dir = if working_dir.starts_with("~/") || working_dir == "~" {
211 if let Some(home) = dirs::home_dir() {
212 home.join(working_dir.trim_start_matches("~/")).to_string_lossy().to_string()
213 } else {
214 working_dir.clone()
215 }
216 } else {
217 working_dir.clone()
218 };
219 let dir_path = std::path::Path::new(&expanded_dir);
220 if !dir_path.exists() {
221 if let Err(e) = std::fs::create_dir_all(dir_path) {
222 tracing::warn!(
223 block_id = %self.block_id,
224 dir = %expanded_dir,
225 error = %e,
226 "failed to create working directory for ACP agent"
227 );
228 }
229 }
230 if dir_path.exists() {
231 cmd.current_dir(&expanded_dir);
232 }
233 }
234
235 for (k, v) in &env_vars {
237 let expanded = crate::backend::base::expand_home_dir_safe(v);
238 cmd.env(k, expanded.to_string_lossy().as_ref());
239 }
240
241 #[cfg(windows)]
245 {
246 const CREATE_NO_WINDOW: u32 = 0x0800_0000;
247 cmd.creation_flags(CREATE_NO_WINDOW);
248 }
249
250 cmd.stdin(std::process::Stdio::piped());
251 cmd.stdout(std::process::Stdio::piped());
252 cmd.stderr(std::process::Stdio::piped());
253
254 let mut child = cmd.spawn().map_err(|e| {
255 tracing::error!(block_id = %self.block_id, error = %e, "ACP process spawn failed");
256 format!("failed to spawn ACP process: {e}")
257 })?;
258
259 let pid = child.id().unwrap_or(0);
260
261 tracing::info!(
262 block_id = %self.block_id,
263 pid = pid,
264 cmd = %cli_command,
265 args = ?cli_args,
266 "ACP agent process spawned"
267 );
268
269 let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<bool>();
270 let stdin = child.stdin.take().unwrap();
271 let stdout = child.stdout.take().unwrap();
272 let stderr = child.stderr.take();
273
274 if let Some(stderr_pipe) = stderr {
276 let block_id_stderr = self.block_id.clone();
277 tokio::spawn(async move {
278 let mut reader = BufReader::new(stderr_pipe).lines();
279 loop {
280 match reader.next_line().await {
281 Err(e) => {
282 tracing::warn!(block_id = %block_id_stderr, error = %e, "ACP stderr read error");
283 break;
284 }
285 Ok(None) => break,
286 Ok(Some(line)) => {
287 if !line.trim().is_empty() {
288 tracing::warn!(
289 block_id = %block_id_stderr,
290 line = %line,
291 "ACP agent stderr"
292 );
293 }
294 }
295 }
296 }
297 });
298 }
299
300 let (msg_tx, mut msg_rx) = mpsc::channel::<String>(32);
302
303 {
304 let mut inner = self.inner.lock().unwrap();
305 inner.current_pid = Some(pid);
306 inner.kill_tx = Some(kill_tx);
307 inner.stdin_tx = Some(msg_tx.clone());
308 Self::set_status(&mut inner, STATUS_RUNNING);
309 }
310 self.publish_status();
311
312 let block_id_stdin = self.block_id.clone();
314 tokio::spawn(async move {
315 let mut stdin = tokio::io::BufWriter::new(stdin);
316 while let Some(line) = msg_rx.recv().await {
317 if let Err(e) = stdin.write_all(line.as_bytes()).await {
318 tracing::error!(block_id = %block_id_stdin, error = %e, "ACP stdin write error");
319 break;
320 }
321 if let Err(e) = stdin.write_all(b"\n").await {
322 tracing::error!(block_id = %block_id_stdin, error = %e, "ACP stdin newline error");
323 break;
324 }
325 if let Err(e) = stdin.flush().await {
326 tracing::error!(block_id = %block_id_stdin, error = %e, "ACP stdin flush error");
327 break;
328 }
329 }
330 });
331
332 let block_id_stdout = self.block_id.clone();
334 let broker_clone = self.broker.clone();
335 let filestore_clone = self.filestore.clone();
336 let inner_clone = self.inner.clone();
337 let health_clone = self.health_monitor.clone();
338 let rpc_id_clone = self.next_rpc_id.clone();
339 tokio::spawn(async move {
340 let mut reader = BufReader::new(stdout).lines();
341 tracing::info!(block_id = %block_id_stdout, "ACP stdout_reader started");
342
343 loop {
344 let line = match reader.next_line().await {
345 Err(e) => {
346 tracing::warn!(block_id = %block_id_stdout, error = %e, "ACP stdout read error");
347 break;
348 }
349 Ok(None) => {
350 tracing::info!(block_id = %block_id_stdout, "ACP stdout EOF");
351 break;
352 }
353 Ok(Some(l)) => l,
354 };
355 if line.is_empty() {
356 continue;
357 }
358
359 if let Ok(json) = serde_json::from_str::<serde_json::Value>(&line) {
361 if let Some(result) = json.get("result") {
363 if let Some(sid) = result.get("sessionId").and_then(|v| v.as_str()) {
364 let mut inner = inner_clone.lock().unwrap();
365 inner.session_id = Some(sid.to_string());
366 tracing::info!(
367 block_id = %block_id_stdout,
368 session_id = %sid,
369 "ACP session established"
370 );
371
372 if let Some(prompt) = inner.pending_prompt.take() {
374 let id = rpc_id_clone.fetch_add(1, Ordering::Relaxed);
375 let req = serde_json::json!({
376 "jsonrpc": "2.0",
377 "id": id,
378 "method": "session/prompt",
379 "params": {
380 "sessionId": sid,
381 "prompt": { "type": "text", "text": prompt },
382 }
383 }).to_string();
384 if let Some(ref tx) = inner.stdin_tx {
385 let _ = tx.try_send(req);
386 }
387 }
388 }
389 }
390
391 if json.get("id").is_some() && json.get("result").is_some() {
393 if let Some(result) = json.get("result") {
395 if result.get("stopReason").is_some() {
396 health_clone.set_active_turn(false);
397 }
398 }
399 }
400 }
401
402 if let Some(ref broker) = broker_clone {
404 let line_with_newline = format!("{}\n", line);
405 super::shell::handle_append_block_file(
406 broker,
407 &block_id_stdout,
408 ACP_OUTPUT_SUBJECT,
409 line_with_newline.as_bytes(),
410 filestore_clone.as_ref(),
411 );
412 }
413 }
414 });
415
416 let block_id_wait = self.block_id.clone();
418 let inner_wait = self.inner.clone();
419 let broker_wait = self.broker.clone();
420 let health_wait = self.health_monitor.clone();
421 tokio::spawn(async move {
422 tokio::select! {
423 _ = kill_rx => {
424 let _ = child.kill().await;
425 tracing::info!(block_id = %block_id_wait, "ACP process killed");
426
427 let mut inner = inner_wait.lock().unwrap();
428 inner.stdin_tx = None;
429 inner.current_pid = None;
430 AcpController::set_status(&mut inner, STATUS_DONE);
431 drop(inner);
432
433 health_wait.set_active_turn(false);
434
435 if let Some(ref broker) = broker_wait {
436 let status = BlockControllerRuntimeStatus {
437 blockid: block_id_wait.clone(),
438 version: 0,
439 shellprocstatus: STATUS_DONE.to_string(),
440 shellprocconnname: "local".to_string(),
441 shellprocexitcode: -1,
442 spawn_ts_ms: None,
443 is_agent_pane: true,
444 };
445 super::publish_controller_status(broker, &status);
446 }
447 }
448 status = child.wait() => {
449 let exit_code = status.map(|s| s.code().unwrap_or(-1)).unwrap_or(-1);
450 tracing::info!(
451 block_id = %block_id_wait,
452 exit_code = exit_code,
453 "ACP process exited"
454 );
455 let mut inner = inner_wait.lock().unwrap();
456 inner.proc_exit_code = exit_code;
457 inner.stdin_tx = None;
458 AcpController::set_status(&mut inner, STATUS_DONE);
459 drop(inner);
460
461 health_wait.set_active_turn(false);
462
463 if let Some(ref broker) = broker_wait {
464 let status = BlockControllerRuntimeStatus {
465 blockid: block_id_wait.clone(),
466 version: 0,
467 shellprocstatus: STATUS_DONE.to_string(),
468 shellprocconnname: "local".to_string(),
469 shellprocexitcode: exit_code,
470 spawn_ts_ms: None,
471 is_agent_pane: true,
472 };
473 super::publish_controller_status(broker, &status);
474 }
475 }
476 }
477 });
478
479 let init_req = self.make_request("initialize", serde_json::json!({
481 "clientInfo": {
482 "name": "AgentMux",
483 "version": env!("CARGO_PKG_VERSION"),
484 },
485 "capabilities": {
486 "tools": true,
487 "fileAccess": true,
488 },
489 "workspaceRoots": [working_dir],
490 }));
491 let init_notification = self.make_notification("initialized", serde_json::json!({}));
492 let session_req = self.make_request("session/create", serde_json::json!({
493 "cwd": working_dir,
494 }));
495
496 let inner = self.inner.lock().unwrap();
498 if let Some(ref tx) = inner.stdin_tx {
499 let _ = tx.try_send(init_req);
500 let _ = tx.try_send(init_notification);
501 let _ = tx.try_send(session_req);
502 }
503
504 Ok(())
505 }
506}
507
508impl Controller for AcpController {
509 fn start(
510 &self,
511 block_meta: super::super::obj::MetaMapType,
512 _rt_opts: Option<serde_json::Value>,
513 _force: bool,
514 ) -> Result<(), String> {
515 let cmd = super::super::obj::meta_get_string(&block_meta, super::META_KEY_CMD, "");
517 let cwd = super::super::obj::meta_get_string(&block_meta, super::META_KEY_CMD_CWD, "");
518 let args_str = super::super::obj::meta_get_string(&block_meta, super::META_KEY_CMD_ARGS, "[]");
519 let env_str = super::super::obj::meta_get_string(&block_meta, super::META_KEY_CMD_ENV, "{}");
520
521 if cmd.is_empty() {
522 return Err("ACP controller: no cmd specified in block meta".to_string());
523 }
524
525 let args: Vec<String> = serde_json::from_str(&args_str).unwrap_or_default();
526 let env_vars: HashMap<String, String> = serde_json::from_str(&env_str).unwrap_or_default();
527
528 self.spawn_process(cmd, args, cwd, env_vars)
529 }
530
531 fn stop(&self, _graceful: bool, _new_status: &str) -> Result<(), String> {
532 {
534 let inner = self.inner.lock().unwrap();
535 if let Some(ref tx) = inner.stdin_tx {
536 let shutdown = self.make_request("shutdown", serde_json::json!({}));
537 let exit = self.make_notification("exit", serde_json::json!({}));
538 let _ = tx.try_send(shutdown);
539 let _ = tx.try_send(exit);
540 }
541 }
542
543 let kill_tx = {
545 let mut inner = self.inner.lock().unwrap();
546 inner.stdin_tx = None;
547 inner.kill_tx.take()
548 };
549 if let Some(tx) = kill_tx {
550 let _ = tx.send(true);
551 }
552
553 {
554 let mut inner = self.inner.lock().unwrap();
555 Self::set_status(&mut inner, STATUS_DONE);
556 }
557 self.publish_status();
558 Ok(())
559 }
560
561 fn get_runtime_status(&self) -> BlockControllerRuntimeStatus {
562 self.get_status_snapshot()
563 }
564
565 fn send_input(&self, input: BlockInputUnion, _seq: Option<u64>) -> Result<(), String> {
566 if let Some(data) = input.input_data {
567 let message = String::from_utf8_lossy(&data).to_string();
570 if message.trim().is_empty() {
571 return Ok(());
572 }
573
574 if !self.is_running() {
575 let mut inner = self.inner.lock().unwrap();
577 inner.pending_prompt = Some(message);
578 return Err("ACP process not running — message queued for next start()".to_string());
579 }
580
581 let session_id = {
582 let inner = self.inner.lock().unwrap();
583 inner.session_id.clone().unwrap_or_default()
584 };
585 let req = self.make_request("session/prompt", serde_json::json!({
586 "sessionId": session_id,
587 "prompt": {
588 "type": "text",
589 "text": message,
590 }
591 }));
592 self.health_monitor.set_active_turn(true);
593 let inner = self.inner.lock().unwrap();
594 if let Some(ref tx) = inner.stdin_tx {
595 tx.try_send(req)
596 .map_err(|e| format!("ACP stdin send failed: {e}"))?;
597 }
598 }
599
600 if let Some(sig) = input.sig_name {
601 if sig == "SIGTERM" || sig == "SIGINT" {
602 return self.stop(true, STATUS_DONE);
603 }
604 }
605
606 Ok(())
607 }
608
609 fn controller_type(&self) -> &str {
610 BLOCK_CONTROLLER_ACP
611 }
612
613 fn block_id(&self) -> &str {
614 &self.block_id
615 }
616
617 fn as_any(&self) -> &dyn std::any::Any {
618 self
619 }
620}